跳到主要内容

SpringBoot 整合 RabbitMQ

大部分内容直接转自 Springboot 整合RabbitMq ,用心看完这一篇就够了(写的超级棒)

配置环境

<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

本次实例教程需要创建2个springboot项目,一个 rabbitmq-provider (生产者),一个 rabbitmq-consumer(消费者)。

这里再创建一个 Virtual Host,方便删除

直连型交换机使用例

创建生产者

注意:下面的都是沿用这套环境

编写配置文件

rabbitmq-provider 的配置

server:
port: 8001

spring:
application:
name: "rabbitmq-provider"
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
# 虚拟host 可以不设置 它会使用服务默认的 host
virtual-host: "myHost"

先使用下 direct exchange(直连型交换机)

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {

/* 队列 起名:TestDirectQueue */
@Bean
public Queue testDirectQueue() {
// 这个 Queue 后面三个配置项
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);

//一般设置一下队列的持久化就好,其余两个就是默认 false
return new Queue("TestDirectQueue",true);
}

/* Direct交换机 起名:TestDirectExchange */
@Bean
DirectExchange testDirectExchange() {
// 这个 DirectExchange 后面两个配置项
// durable:是否持久化,默认是 false, 如果为 true 该交换将在服务器重新启动后继续存在
// autoDelete:是否自动删除,如果没有服务在使用这个交换机时自动删除
return new DirectExchange("TestDirectExchange",true,false);
}

/* 绑定 将上面创建的队列与交换机绑定, 并设置用于匹配键:TestDirectRouting */
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("TestDirectRouting");
}


@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}

然后写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求)

@RestController
public class SendMessageController {

@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法

@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);

//将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}

打开 cmd

curl http://localhost:8001/sendDirectMessage

因为目前还没弄消费者 rabbitmq-consumer,消息没有被消费的,可以去 rabbitMq 管理页面看看,是否推送成功:

可以看到 SpringBoot 自动创建了上面使用 Bean 注入的两个交换机

点进这个 TestDirectExchange 可以看到它绑定了 TestDirectQueue

检查这个队列可以发现刚才发送的消息

创建消费者

注意:下面的都是沿用这套环境

创建一个 rabbitmq-consumer 项目

配置文件和上基本相同

server:
port: 8002
spring:
application:
name: "rabbitmq-consumer"
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
# 虚拟host 可以不设置 它会使用服务默认的 host
virtual-host: "myHost"

创建一个监听器就能收到生产者发过来的数据了

@Component
@RabbitListener(queues = "TestDirectQueue")// 监听的队列名称 TestDirectQueue
public class DirectReceiver {

/* @RabbitHandler 注解用于标记当前方法可以取得消息队列的内容 */
@RabbitHandler
public void process(Map<String,Object> testMessage) {
System.out.println("DirectReceiver 消费者收到消息 : " + testMessage.toString());
}
}

然后将 rabbitmq-consumer 项目运行起来,可以看到把之前推送的那条消息消费下来了:

那么直连交换机既然是一对一,那如果配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?

创建多一个监听器

可以看到是实现了轮询的方式对消息进行消费,而且不存在重复消费。

如何监听消息

这部分转载自 RabbitMQ 学习笔记 -- 07 初探@RabbitListener

有了上述的内容,大概知道了 RabbitMQ 是如何在 SpringBoot 中使用的,在继续学习之前先来看下 RabbitMQ 的消息监听方法

在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

MessageProperties // 消息属性
byte[] body // 消息内容

如下使用 Message 类型接收数据,当监听到队列 hello 中有消息时则会进行接收并处理,MessageConvert 会直接转换成消息类型,并绑定在对应被注解的方法中。

消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

//监听这个 hello 队列
@RabbitListener(queues = "hello")
public void receive1(Message message) {    
// 消息内容,二进制数据    
MessageProperties properties = message.getMessageProperties();    
byte[] body = message.getBody();
}

MessageConvert

涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析

RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等

消息转换器 MessageConverter 重要的两个方法

// 将java对象和属性对象转换成 Message 对象。
Message toMessage(Object object, MessageProperties messageProperties); 
// 将消息对象转换成 java 对象。
Object fromMessage(Message message) throws MessageConversionException;

@RabbitListener

@RabbitListener 注解的方法中,使用 @Payload@Headers 注解可以获取消息中的 body 和 headers 消息。它们都会被 MessageConvert 转换器解析转换后(使用 fromMessage 方法进行转换),将结果绑定在对应注解的方法中。

/**
* 这里是监听 hello 队列,并将接收到的消息体 body 根据 MessageConvert 转换器转换成 String 类型输出
* @Headers 获取所有头部属性消息,也可以用 @Header 获取单个 header 消息
*/
@RabbitListener(queues = "hello")
public void processMessage1(@Payload String message, 
                            @Headers Map<String,Object> headers, 
                            @Header(value = AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {
    System.out.println("message:" + message);
    System.out.println("Headers:" + headers);
    System.out.println("routingKey: " + routingKey);
}

默认消费者消费时,消息的 content_type 属性表示消息 body 数据以什么数据格式传输存储,直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

application/octet-stream:二进制字节数组存储,使用 byte[]
application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
text/plain:文本数据类型存储,使用 String
application/json:JSON 格式,使用 Object、相应类型

自定义 MessageConverter

自定义 MessageConverter 方法,进行消费消息的处理

如下是自定义针对 fromMessage 方法,将消费的消息转换成 TestA 对象

@Configuration
public class RabbitMQConfig {
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 设置自定义的 MessageConverter
        factory.setMessageConverter(new MessageConverter() {
            @Override
            public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
                return null;
            }

            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))){
                    // 返回自定义的对象
                    return (TestA)ois.readObject();
                }catch (Exception e){
                    e.printStackTrace();
                    return null;
                }
            }
        });
        return factory;
    }
}
class TestA implements Serializable {
    String fieldA;
    String fieldB;
    // 省略 Get Set ToString
}

生产者:

@Test
public void producer() {
    String routingKey = "hello";
    TestA a = new TestA();
    a.setFieldA("FBI WARNING");
    rabbitTemplate.convertAndSend(routingKey, a);
    System.out.println("发送成功");
}

消费者:

@Component
@RabbitListener(queues = "hello")
public class receiver {
    @RabbitHandler
    public void processMessage1(TestA test) {
        System.out.println(test.getFieldA());
    }
}
输出:FBI WARNING

@RabbitHandler

@RabbitListener@RabbitHandler 搭配使用

  • @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用。
  • @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 注解的方法进行分发处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

消费者:

@Component
@RabbitListener(queues = "hello")
public class consumer {

    @RabbitHandler
    public void receive(byte[] msg) {
        System.out.println("byte[] 消费者消费信息:" + new String(msg));
    }
    @RabbitHandler
    public void receive(String msg) {
        System.out.println("String 消费者消费信息:" + msg);
    }
    @RabbitHandler
    public void receive(Integer msg) {
        System.out.println("Integer 消费者消费信息:" + msg);
    }
    @RabbitHandler
    public void receive(TestA a) {
        System.out.println("对象 TestA 消费者消费信息:" + a.toString());
    }
}

class TestA implements Serializable {
    String fieldA;
    String fieldB;
    // 省略 Get Set ToString
}

生产者:

@Test
public void demo_06_Producer() {
    String routingKey = "hello";
    String str = "FBI OPEN THE DOOR";
    Integer i = 102424;
    TestA a = new TestA();
    a.setFieldA("FBI WARNING");

    // rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.convertAndSend(routingKey, a);
    rabbitTemplate.convertAndSend(routingKey, i);
    rabbitTemplate.convertAndSend(routingKey, str);
    System.out.println("发送成功");
}

输出:

对象 TestA 消费者消费信息:"FBI WARNING

Integer 消费者消费信息:102424

String 消费者消费信息:FBI OPEN THE DOOR

这里生产者在发送消息时,调用了 RabbitTemplate 中的 convertAndSend 方法会使用 MessageConvert 对 TestA 对象进行消息的序列化,其默认的实现也是 SimpleMessageConverter

消息确认机制

和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。

消息接收的模式

消息接收的确认机制主要存在两种模式:

1、自动确认, 这也是默认的消息确认情况。 AcknowledgeMode.NONE

RabbitMQ 成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。

所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。 一般这种情况都是使用 try catch 捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

2、手动确认, 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。

消费者收到消息后,手动调用 basic.ack basic.nack basic.reject 后,RabbitMQ收到这些消息后,才认为本次投递成功。

basic.ack 用于肯定确认 basic.nack 用于否定确认(注意:这是 AMQP 0-9-1 的 RabbitMQ 扩展) basic.reject 用于否定确认,但与 basic.nack 相比有一个限制:一次只能拒绝单条消息

Reject 拒绝

channel.basicReject(deliveryTag, true);  

拒绝消费当前消息,如果第二参数传入 true,就是将数据重新丢回队列里,那么下次还会消费这消息。

设置 false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。

使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch 异常再拒绝入列,选择是否重入列。

但是如果使用不当会导致一些每次都被重入列的消息一直

消费 > 入列 > 消费 > 入列

这样循环,会导致消息积压。

Nack 拒绝

channel.basicNack(deliveryTag, false, true);

第一个参数依然是当前消息到的数据的唯一 id;

第二个参数是指是否针对多条消息;如果是 true,也就是说一次性针对当前通道的消息的 tagID 小于当前这条消息的,都拒绝确认。

第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。

同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

具体使用例

在消费者这端添加一个监听配置

@Configuration
public class MessageListenerConfig {

@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MyAckReceiver myAckReceiver; //消息接收处理类

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置一个队列
container.setQueueNames("TestDirectQueue");
// 如果同时设置多个如下: 前提是队列都是必须已经创建存在的
// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");


//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
// container.setQueues(new Queue("TestDirectQueue",true));
// container.addQueues(new Queue("TestDirectQueue2",true));
// container.addQueues(new Queue("TestDirectQueue3",true));
container.setMessageListener(myAckReceiver);

return container;
}

}

对应的手动确认消息监听类,MyAckReceiver.java(手动确认模式需要实现 ChannelAwareMessageListener)

之前配置的监听器可以先注释掉了,因为这个也是一个监听器

@Configuration
public class MyAckReceiver implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 因为传递消息的时候用的 map 传递,所以将 Map 从 Message 内取出需要做些处理
String msg = message.toString();
String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
Map<String, String> msgMap = mapStringToMap(msgArray[1].trim());
String messageId = msgMap.get("messageId");
String messageData = msgMap.get("messageData");
String createTime = msgMap.get("createTime");
System.out.println(" MyAckReceiver messageId:" + messageId + " messageData:" + messageData + " createTime:" + createTime);
System.out.println("消费的主题消息来自:" + message.getMessageProperties().getConsumerQueue());

// 上面只是处理消息,下面这一段才是关键
//第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 第二个参数,true 会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
channel.basicReject(deliveryTag, true);
e.printStackTrace();
}
}

// {key=value,key=value,key=value} 格式转换成map
private Map<String, String> mapStringToMap(String str) {
str = str.substring(1, str.length() - 1);
String[] strs = str.split(",", 3);
Map<String, String> map = new HashMap<>();
for (String string : strs) {
String key = string.split("=")[0].trim();
String value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}

Topic Exchange 主题交换机

这里沿用上面 直连型交换机使用例 的配置环境

配置生产者

创建一个 TopicRabbitConfig

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicRabbitConfig {
// 绑定键
public static final String MAN = "topic.man";
public static final String WOMAN = "topic.woman";

@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.MAN);
}

@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.WOMAN);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}


// 将 firstQueue 和 topicExchange 绑定,而且绑定的键值为 topic.man
// 这样只要是消息携带的路由键是 topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(MAN);
}

// 将 secondQueue 和 topicExchange 绑定,而且绑定的键值为用上通配路由键规则 topic.#
// 这样只要是消息携带的路由键是以 topic. 开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}

}

然后添加多2个接口,用于推送消息到主题交换机:

@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();

manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);

rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}

@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();

womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);

rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}

配置消费者

同样创建两个监听器,一个专门监听这个 topic.man 队列,一个监听 topic.woman 队列

注意,得先等上面的生产者先执行,得让它先在 RabbitMQ 里面创建那个交换机与队列,然后发送一个消息它才会创建这个队列,否则先执行这个监听会报错

@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {

@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicManReceiver消费者收到消息 : " + testMessage.toString());
}
}
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {

@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicTotalReceiver消费者收到消息 : " + testMessage.toString());
}
}

先发送一个 sendTopicMessage1

curl http://localhost:8001/sendTopicMessage1

这时检查控制台,可以发现两个队列都收到了这个消息

再发送一个 sendTopicMessage2

curl http://localhost:8001/sendTopicMessage2

可以发现只有一个接收到了这个消息

Fanout Exchang 扇型交换机

这个 Fanout Exchang 扇型交换机其实就是广播模式

配置生产者

@Configuration
public class FanoutRabbitConfig {

/**
* 创建三个队列 :fanout.A fanout.B fanout.C
* 将三个队列都绑定在交换机 fanoutExchange 上
* 因为是扇型交换机, 路由键无需配置,配置也不起作用
*/


@Bean
public Queue queueA() {
return new Queue("fanout.A");
}

@Bean
public Queue queueB() {
return new Queue("fanout.B");
}

@Bean
public Queue queueC() {
return new Queue("fanout.C");
}

// 这里只创建一个 Fanout Exchang 扇型交换机。下面都是绑定它和队列们
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}

@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}

@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}

@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}

然后是写一个接口用于推送消息

@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: testFanoutMessage ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();

map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);

rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}

配置消费者

因为有三个队列,所以对应的也需要三个监听器

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString());
}

}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverB消费者收到消息 : " +testMessage.toString());
}

}
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverC消费者收到消息 : " +testMessage.toString());
}

}

发送一条数据

curl http://localhost:8001/sendFanoutMessage

查看控制台发现三个队列都收到了消息

使用注解的方式绑定交换机

上面那样直接在 @RabbitListener 注解写上要监听的队列名有个坏处,就是如果不存在这个队列就会报错

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString());
}

}

而使用下面注解绑定的好处是,无需等待生产者创建这个队列,即使不存在也不会报错

// 使用 RabbitListener 注解监听队列,也可以直接写在方法上
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false", autoDelete = "true"))
public class HelloCustomer {
@RabbitHandler
public void customer01(String message){
System.out.println("message is:" + message);
}
}

绑定一个扇形交换机

@Component
public class FanoutCustomer {
// 消费者01
@RabbitListener(bindings = {
@QueueBinding(
// 不写名字就是生成一个临时队列
value = @Queue,
// 绑定一个交换机
exchange = @Exchange(value = "logs",type = "fanout"))
})
public void customer01(String message){
System.out.println("这是消费者1:" + message);
}

// 消费者02
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs",type = "fanout"))
})
public void customer02(String message){
System.out.println("这是消费者2:" + message);
}
}

绑定一个直连交换机

@Component
public class RouteCustomer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs",type = "direct"),
key = {"info","error","warn"}
)
})
public void customer01(String message){
System.out.println("消费者01"+ message);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs",type = "direct"),
key = {"*"}
)
})
public void customer02(String message){
System.out.println("消费者02"+ message);
}
}